package com.chb.userbehavioranalysis.order

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector


object TxMatchByJoin {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        import org.apache.flink.streaming.api.scala._
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

        // 读取订单事件流
        val resource = getClass.getResource("/OrderLog.csv")
        val orderEventStream = env.readTextFile(resource.getPath)
            .map(data => {
                val dataArray = data.split(",")
                OrderEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)
            })
            .filter(_.txId != "")
            .assignAscendingTimestamps(_.eventTime * 1000L)
            .keyBy(_.txId)

        // 读取支付到账事件流
        val receiptResource = getClass.getResource("/ReceiptLog.csv")
        //    val receiptEventStream = env.readTextFile(receiptResource.getPath)
        val receiptEventStream = env.socketTextStream("localhost", 8888)
            .map(data => {
                val dataArray = data.split(",")
                ReceiptEvent(dataArray(0).trim, dataArray(1).trim, dataArray(2).toLong)
            })
            .assignAscendingTimestamps(_.eventTime * 1000L)
            .keyBy(_.txId)

        // join处理
        val processedStream = orderEventStream.intervalJoin(receiptEventStream)
            .between(Time.seconds(-5), Time.seconds(5))
            .process(new TxPayMatchByJoin())

        processedStream.print()

        env.execute("tx pay match by join job")
    }
}

class TxPayMatchByJoin() extends ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)] {
    override def processElement(left: OrderEvent, right: ReceiptEvent, ctx: ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
        out.collect((left, right))
    }
}
